from __future__ import annotations

import os
import pathlib
import subprocess
import time
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
from time import sleep
from typing import TYPE_CHECKING, Generator

import pytest
from arro3.core import Array, DataType, Field, Schema, Table
from azure.storage import blob

from deltalake import DeltaTable, WriterProperties, write_deltalake

if TYPE_CHECKING:
    import pyarrow as pa
    from minio import Minio


def wait_till_host_is_available(host: str, timeout_sec: int = 0.5):
    spacing = 2
    start = time.monotonic()
    while True:
        if time.monotonic() - start > timeout_sec:
            raise TimeoutError(f"Host {host} is not available after {timeout_sec} sec")

        try:
            subprocess.run(["curl", host], timeout=timeout_sec * 1000, check=True)
        except Exception:
            pass
        else:
            break

        sleep(spacing)


@pytest.fixture(scope="session")
def s3_localstack_creds():
    endpoint_url = "http://localhost:4566"

    config = dict(
        AWS_REGION="us-east-1",
        AWS_ACCESS_KEY_ID="deltalake",
        AWS_SECRET_ACCESS_KEY="weloverust",
        AWS_ENDPOINT_URL=endpoint_url,
    )

    env = os.environ.copy()
    env.update(config)

    setup_commands = [
        [
            "aws",
            "s3api",
            "create-bucket",
            "--bucket",
            "deltars",
            "--endpoint-url",
            endpoint_url,
        ],
        [
            "aws",
            "s3",
            "sync",
            "--quiet",
            "../crates/test/tests/data/simple_table",
            "s3://deltars/simple",
            "--endpoint-url",
            endpoint_url,
        ],
    ]

    wait_till_host_is_available(endpoint_url)

    try:
        for args in setup_commands:
            subprocess.run(args, env=env)
    except OSError:
        pytest.skip("aws cli not installed")

    yield config

    shutdown_commands = [
        [
            "aws",
            "s3",
            "rm",
            "--quiet",
            "--recursive",
            "s3://deltars",
            "--endpoint-url",
            endpoint_url,
        ],
        [
            "aws",
            "s3api",
            "delete-bucket",
            "--bucket",
            "deltars",
            "--endpoint-url",
            endpoint_url,
        ],
    ]

    for args in shutdown_commands:
        subprocess.run(args, env=env)


@pytest.fixture()
def s3_localstack(monkeypatch, s3_localstack_creds):
    monkeypatch.setenv("AWS_ALLOW_HTTP", "TRUE")
    for key, value in s3_localstack_creds.items():
        monkeypatch.setenv(key, value)


@pytest.fixture(scope="session")
def azurite_creds():
    # These are the well-known values
    # https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#well-known-storage-account-and-key
    account_name = "devstoreaccount1"
    config = dict(
        AZURE_STORAGE_ACCOUNT_NAME=account_name,
        AZURE_STORAGE_ACCOUNT_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
        AZURE_STORAGE_CONTAINER_NAME="deltars",
        AZURE_STORAGE_USE_EMULATOR="true",
        AZURE_STORAGE_USE_HTTP="true",
        AZURE_STORAGE_ENDPOINT=f"http://localhost:10000/{account_name}",
    )

    env = os.environ.copy()
    env.update(config)
    conn_str = (
        "DefaultEndpointsProtocol=http;"
        f"AccountName={config['AZURE_STORAGE_ACCOUNT_NAME']};"
        f"AccountKey={config['AZURE_STORAGE_ACCOUNT_KEY']};"
        f"BlobEndpoint={config['AZURE_STORAGE_ENDPOINT']};"
    )
    env["AZURE_STORAGE_CONNECTION_STRING"] = conn_str
    wait_till_host_is_available(config["AZURE_STORAGE_ENDPOINT"])
    try:
        blob_client = blob.BlobServiceClient.from_connection_string(conn_str=conn_str)
        container = blob_client.create_container(
            name=config["AZURE_STORAGE_CONTAINER_NAME"]
        )
        yield config
    finally:
        container.delete_container()


@pytest.fixture()
def azurite_env_vars(monkeypatch, azurite_creds):
    for key, value in azurite_creds.items():
        monkeypatch.setenv(key, value)


@pytest.fixture(scope="session")
def azurite_sas_creds(azurite_creds):
    env = os.environ.copy()
    env.update(azurite_creds)
    env["AZURE_STORAGE_CONNECTION_STRING"] = (
        "DefaultEndpointsProtocol=http;"
        f"AccountName={azurite_creds['AZURE_STORAGE_ACCOUNT_NAME']};"
        f"AccountKey={azurite_creds['AZURE_STORAGE_ACCOUNT_KEY']};"
        f"BlobEndpoint={azurite_creds['AZURE_STORAGE_ENDPOINT']};"
    )
    sas_token = blob.generate_container_sas(
        account_name=azurite_creds["AZURE_STORAGE_ACCOUNT_NAME"],
        container_name=azurite_creds["AZURE_STORAGE_CONTAINER_NAME"],
        account_key=azurite_creds["AZURE_STORAGE_ACCOUNT_KEY"],
        permission=blob.ContainerSasPermissions(
            read=True,
            write=True,
            list=True,
            delete=True,
        ),
        expiry=datetime.now(tz=timezone.utc) + timedelta(hours=1),
        start=datetime.now(tz=timezone.utc),
        protocol="http",
    )
    creds = {key: value for key, value in azurite_creds.items() if "KEY" not in key}
    creds["SAS_TOKEN"] = sas_token

    return creds


@pytest.fixture()
def sample_data_pyarrow() -> "pa.Table":
    nrows = 5
    import pyarrow as pa

    return pa.table(
        {
            "utf8": pa.array([str(x) for x in range(nrows)]),
            "int64": pa.array(list(range(nrows)), pa.int64()),
            "int32": pa.array(list(range(nrows)), pa.int32()),
            "int16": pa.array(list(range(nrows)), pa.int16()),
            "int8": pa.array(list(range(nrows)), pa.int8()),
            "float32": pa.array([float(x) for x in range(nrows)], pa.float32()),
            "float64": pa.array([float(x) for x in range(nrows)], pa.float64()),
            "bool": pa.array([x % 2 == 0 for x in range(nrows)]),
            "binary": pa.array([str(x).encode() for x in range(nrows)]),
            "decimal": pa.array([Decimal("10.000") + x for x in range(nrows)]),
            "date32": pa.array(
                [date(2022, 1, 1) + timedelta(days=x) for x in range(nrows)]
            ),
            "timestamp": pa.array(
                [datetime(2022, 1, 1) + timedelta(hours=x) for x in range(nrows)]
            ),
            "struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]),
            "list": pa.array(
                [list(range(x + 1)) for x in range(nrows)],
                type=pa.list_(pa.field("inner", pa.int64(), nullable=False)),
            ),
            # NOTE: https://github.com/apache/arrow-rs/issues/477
            #'map': pa.array([[(str(y), y) for y in range(x)] for x in range(nrows)], pa.map_(pa.string(), pa.int64())),
        }
    )


@pytest.fixture()
def existing_table(tmp_path: pathlib.Path, sample_data_pyarrow: "pa.Table"):
    path = str(tmp_path)
    write_deltalake(path, sample_data_pyarrow)
    return DeltaTable(path)


@pytest.fixture()
def sample_table() -> Table:
    nrows = 5
    return Table(
        {
            "id": Array(
                ["1", "2", "3", "4", "5"],
                Field("id", type=DataType.string(), nullable=True),
            ),
            "price": Array(
                list(range(nrows)), Field("price", type=DataType.int64(), nullable=True)
            ),
            "sold": Array(
                list(range(nrows)), Field("sold", type=DataType.int32(), nullable=True)
            ),
            "deleted": Array(
                [False] * nrows, Field("deleted", type=DataType.bool(), nullable=True)
            ),
        },
    )


@pytest.fixture()
def existing_sample_table(tmp_path: pathlib.Path, sample_table: Table):
    path = str(tmp_path)
    write_deltalake(path, sample_table)
    return DeltaTable(path)


@pytest.fixture()
def sample_table_with_spaces_numbers() -> Table:
    nrows = 5
    return Table.from_pydict(
        {
            "1id": Array(["1", "2", "3", "4", "5"], DataType.string()),
            "price": Array(list(range(nrows)), DataType.int64()),
            "sold items": Array(list(range(nrows)), DataType.int32()),
            "deleted": Array(
                [False] * nrows,
                DataType.bool(),
            ),
        },
        schema=Schema(
            fields=[
                Field("1id", type=DataType.string(), nullable=True),
                Field("price", type=DataType.int64(), nullable=True),
                Field("sold items", type=DataType.int32(), nullable=True),
                Field("deleted", type=DataType.bool(), nullable=True),
            ]
        ),
    )


@pytest.fixture(scope="session")
def minio_container() -> Generator[tuple[dict, Minio], None, None]:
    """Start a MinIO test container for S3-compatible object storage."""
    from testcontainers.minio import MinioContainer

    container = MinioContainer(
        image="minio/minio:latest",
        access_key="minioadmin",
        secret_key="minioadmin",
    )
    container.start()

    client = container.get_client()

    container_config = container.get_config()

    config = {
        "AWS_REGION": "us-east-1",
        "AWS_ACCESS_KEY_ID": container_config["access_key"],
        "AWS_SECRET_ACCESS_KEY": container_config["secret_key"],
        "AWS_ENDPOINT_URL": "http://" + container_config["endpoint"],
        "AWS_ALLOW_HTTP": "TRUE",
        "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    }

    yield (config, client)

    container.stop()


@pytest.fixture()
def minio_s3_env(monkeypatch, minio_container):
    """Set up environment variables for MinIO S3-compatible storage."""
    for key, value in minio_container.items():
        monkeypatch.setenv(key, value)
    return minio_container


@pytest.fixture()
def writer_properties():
    return WriterProperties(compression="GZIP", compression_level=0)
